use memchr::memchr;
use std::char;
use std::fs::File;
use std::io::{self, BufRead, Seek};
use std::iter;
use std::path::Path;
use std::slice;
use std::str::{self, Utf8Error};
use buffer_redux;
use super::policy::{BufPolicy, StdPolicy};
use super::*;
use std::error::Error as StdError;
type DefaultBufPolicy = StdPolicy;
const BUFSIZE: usize = 64 * 1024;
#[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd)]
enum RecordPos {
Head,
Seq,
Sep,
Qual,
}
pub struct Reader<R: io::Read, P = DefaultBufPolicy> {
buf_reader: buffer_redux::BufReader<R>,
buf_pos: BufferPosition,
incomplete_pos: Option<RecordPos>,
position: Position,
finished: bool,
buf_policy: P,
}
impl<R> Reader<R, DefaultBufPolicy>
where
R: io::Read,
{
#[inline]
pub fn new(reader: R) -> Reader<R, StdPolicy> {
Reader::with_capacity(reader, BUFSIZE)
}
#[inline]
pub fn with_capacity(reader: R, capacity: usize) -> Reader<R, StdPolicy> {
assert!(capacity >= 3);
Reader {
buf_reader: buffer_redux::BufReader::with_capacity(capacity, reader),
buf_pos: BufferPosition::default(),
incomplete_pos: None,
position: Position::new(1, 0),
finished: false,
buf_policy: StdPolicy,
}
}
}
impl Reader<File, DefaultBufPolicy> {
#[inline]
pub fn from_path<P: AsRef<Path>>(path: P) -> io::Result<Reader<File>> {
File::open(path).map(Reader::new)
}
}
impl<R, P> Reader<R, P>
where
R: io::Read,
P: BufPolicy,
{
#[inline]
pub fn set_policy<T: BufPolicy>(self, policy: T) -> Reader<R, T> {
Reader {
buf_reader: self.buf_reader,
buf_pos: self.buf_pos,
position: self.position,
incomplete_pos: self.incomplete_pos,
finished: self.finished,
buf_policy: policy,
}
}
#[inline]
pub fn policy(&self) -> &P {
&self.buf_policy
}
#[allow(clippy::should_implement_trait)]
#[inline]
pub fn next(&mut self) -> Option<Result<RefRecord, Error>> {
if try_opt!(self._read_next()) {
return Some(Ok(RefRecord {
buffer: self.get_buf(),
buf_pos: &self.buf_pos,
}))
}
None
}
#[inline]
fn next_owned(&mut self) -> Option<Result<OwnedRecord, Error>> {
self.next().map(|res| res.map(|rec| rec.to_owned_record()))
}
#[inline]
pub fn read_record_set(&mut self, rset: &mut RecordSet) -> Option<Result<(), Error>> {
if try_opt!(self._read_record_set(rset)) {
return Some(Ok(()));
}
None
}
fn get_buf(&self) -> &[u8] {
self.buf_reader.buffer()
}
fn next_pos(&mut self) {
self.position.byte += (self.buf_pos.pos.1 + 1 - self.buf_pos.pos.0) as u64;
self.position.line += 4;
self.buf_pos.pos.0 = self.buf_pos.pos.1 + 1;
}
fn search_record(&mut self) -> Result<bool, Error> {
'sep: loop {
'seq: loop {
'head: loop {
if let Some(pos) = self.incomplete_pos.take() {
match pos {
RecordPos::Head => {}
RecordPos::Seq => break 'head,
RecordPos::Sep => break 'seq,
RecordPos::Qual => break 'sep,
}
}
self.buf_pos.seq = unwrap_or!(self.find_line(self.buf_pos.pos.0), {
self.incomplete_pos = Some(RecordPos::Head);
return Ok(false);
});
}
self.buf_pos.sep = unwrap_or!(self.find_line(self.buf_pos.seq), {
self.incomplete_pos = Some(RecordPos::Seq);
return Ok(false);
});
}
self.buf_pos.qual = unwrap_or!(self.find_line(self.buf_pos.sep), {
self.incomplete_pos = Some(RecordPos::Sep);
return Ok(false);
});
}
self.buf_pos.pos.1 = unwrap_or!(self.find_line(self.buf_pos.qual), {
self.incomplete_pos = Some(RecordPos::Qual);
return Ok(false);
}) - 1;
self.validate()?;
Ok(true)
}
#[inline(always)] fn validate(&mut self) -> Result<(), Error> {
let start_byte = self.get_buf()[self.buf_pos.pos.0];
if start_byte != b'@' {
self.finished = true;
return Err(Error::InvalidStart {
found: start_byte,
pos: self.get_error_pos(0, false),
});
}
let sep_byte = self.get_buf()[self.buf_pos.sep];
if sep_byte != b'+' {
self.finished = true;
return Err(Error::InvalidSep {
found: sep_byte,
pos: self.get_error_pos(2, true),
});
}
let qual_len = self.buf_pos.pos.1 - self.buf_pos.qual + 1;
let seq_len = self.buf_pos.sep - self.buf_pos.seq;
if seq_len != qual_len {
self.finished = true;
return Err(Error::UnequalLengths {
seq: self.buf_pos.seq(self.get_buf()).len(),
qual: self.buf_pos.qual(self.get_buf()).len(),
pos: self.get_error_pos(0, true),
});
}
Ok(())
}
#[inline(never)]
fn get_error_pos(&self, offset: u64, parse_id: bool) -> ErrorPosition {
let id = if parse_id && self.buf_pos.seq - self.buf_pos.pos.0 > 1 {
let id = self
.buf_pos
.head(self.get_buf())
.split(|b| *b == b' ')
.next()
.unwrap();
Some(String::from_utf8_lossy(id).into())
} else {
None
};
ErrorPosition {
line: self.position.line + offset,
id,
}
}
fn find_line(&self, search_start: usize) -> Option<usize> {
memchr(b'\n', &self.get_buf()[search_start..]).map(|pos| search_start + pos + 1)
}
#[inline(never)]
fn next_complete(&mut self) -> Result<bool, Error> {
loop {
if self.get_buf().len() < self.buf_reader.capacity() {
return self.check_end();
} else if self.buf_pos.pos.0 == 0 {
self.grow()?;
} else {
self.make_room();
}
fill_buf(&mut self.buf_reader)?;
if self.find_incomplete()? {
return Ok(true);
}
}
}
#[inline(never)]
fn check_end(&mut self) -> Result<bool, Error> {
self.finished = true;
if self.search_pos == RecordPos::Qual {
self.buf_pos.pos.1 = self.get_buf().len();
self.validate()?;
return Ok(true);
}
let rest = &self.get_buf()[self.buf_pos.pos.0..];
if rest.split(|c| *c == b'\n').all(|l| trim_cr(l).is_empty()) {
return Ok(false);
}
Err(Error::UnexpectedEnd {
pos: self.get_error_pos(self.search_pos as u64, self.search_pos > RecordPos::Head),
})
}
fn grow(&mut self) -> Result<(), Error> {
let cap = self.buf_reader.capacity();
let new_size = self.buf_policy.grow_to(cap).ok_or(Error::BufferLimit)?;
let additional = new_size - cap;
self.buf_reader.reserve(additional);
Ok(())
}
fn make_room(&mut self) {
let consumed = self.buf_pos.pos.0;
self.buf_reader.consume(consumed);
self.buf_reader.make_room();
self.buf_pos.pos.0 = 0;
if self.search_pos >= RecordPos::Seq {
self.buf_pos.seq -= consumed;
}
if self.search_pos >= RecordPos::Sep {
self.buf_pos.sep -= consumed;
}
if self.search_pos >= RecordPos::Qual {
self.buf_pos.qual -= consumed;
}
}
#[inline]
pub fn position(&self) -> &Position {
&self.position
}
#[inline]
pub fn records(&mut self) -> RecordsIter<R, P> {
RecordsIter { rdr: self }
}
#[inline]
pub fn into_records(self) -> RecordsIntoIter<R, P> {
RecordsIntoIter { rdr: self }
}
}
impl<R, P> Reader<R, P>
where
R: io::Read + Seek,
P: BufPolicy,
{
#[inline]
pub fn seek(&mut self, pos: &Position) -> Result<(), Error> {
self.finished = false;
let diff = pos.byte as i64 - self.position.byte as i64;
let endpos = self.buf_pos.pos.0 as i64 + diff;
self.position = pos.clone();
if endpos >= 0 && endpos < (self.get_buf().len() as i64) {
self.buf_pos.reset(endpos as usize); return Ok(());
}
self.buf_reader.seek(io::SeekFrom::Start(pos.byte))?;
self.buf_pos.reset(0);
Ok(())
}
}
pub struct RecordsIter<'a, R, P = DefaultBufPolicy>
where
P: 'a,
R: io::Read + 'a,
{
rdr: &'a mut Reader<R, P>,
}
impl<'a, R, P> Iterator for RecordsIter<'a, R, P>
where
P: BufPolicy + 'a,
R: io::Read + 'a,
{
type Item = Result<OwnedRecord, Error>;
#[inline]
fn next(&mut self) -> Option<Self::Item> {
self.rdr.next().map(|rec| rec.map(|r| r.to_owned_record()))
}
}
pub struct RecordsIntoIter<R: io::Read, P = DefaultBufPolicy> {
rdr: Reader<R, P>,
}
impl<R, P> Iterator for RecordsIntoIter<R, P>
where
P: BufPolicy,
R: io::Read,
{
type Item = Result<OwnedRecord, Error>;
#[inline]
fn next(&mut self) -> Option<Self::Item> {
self.rdr.next().map(|rec| rec.map(|r| r.to_owned_record()))
}
}
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
pub struct Position {
line: u64,
byte: u64,
}
impl Position {
#[inline]
pub fn new(line: u64, byte: u64) -> Position {
Position { line, byte }
}
#[inline]
pub fn line(&self) -> u64 {
self.line
}
#[inline]
pub fn byte(&self) -> u64 {
self.byte
}
}
#[derive(Debug)]
pub enum Error {
Io(io::Error),
UnequalLengths {
seq: usize,
qual: usize,
pos: ErrorPosition,
},
InvalidStart {
found: u8,
pos: ErrorPosition,
},
InvalidSep {
found: u8,
pos: ErrorPosition,
},
UnexpectedEnd {
pos: ErrorPosition,
},
BufferLimit,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ErrorPosition {
pub line: u64,
pub id: Option<String>,
}
impl fmt::Display for ErrorPosition {
#[inline]
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
if let Some(id) = self.id.as_ref() {
write!(f, "record '{}' at ", id)?;
}
write!(f, "line {}", self.line)
}
}
impl fmt::Display for Error {
#[inline]
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match *self {
Error::Io(ref e) => e.fmt(f),
Error::UnequalLengths { seq, qual, ref pos } => write!(
f,
"FASTQ parse error: sequence length is {}, but quality length is {} ({}).",
seq, qual, pos
),
Error::InvalidStart { found, ref pos } => write!(
f,
"FASTQ parse error: expected '@' at record start but found '{}' ({}).",
(found as char).escape_default(),
pos
),
Error::InvalidSep { found, ref pos } => write!(
f,
"FASTQ parse error: Expected '+' separator but found '{}' ({}).",
(found as char).escape_default(),
pos
),
Error::UnexpectedEnd { ref pos } => {
write!(f, "FASTQ parse error: unexpected end of input ({}).", pos)
}
Error::BufferLimit => write!(f, "FASTQ parse error: Buffer limit reached."),
}
}
}
impl From<io::Error> for Error {
#[inline]
fn from(e: io::Error) -> Error {
Error::Io(e)
}
}
impl StdError for Error {
#[inline]
fn source(&self) -> Option<&(dyn StdError + 'static)> {
match *self {
Error::Io(ref err) => Some(err),
_ => None,
}
}
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
struct BufferPosition {
pos: (usize, usize),
seq: usize,
sep: usize,
qual: usize,
}
impl BufferPosition {
#[inline]
fn is_new(&self) -> bool {
self.pos.1 == 0
}
#[inline]
fn reset(&mut self, start: usize) {
self.pos.0 = start;
self.pos.1 = 0;
}
#[inline]
fn head<'a>(&'a self, buffer: &'a [u8]) -> &'a [u8] {
trim_cr(&buffer[self.pos.0 + 1..self.seq - 1])
}
#[inline]
fn seq<'a>(&'a self, buffer: &'a [u8]) -> &'a [u8] {
trim_cr(&buffer[self.seq..self.sep - 1])
}
#[inline]
fn qual<'a>(&'a self, buffer: &'a [u8]) -> &'a [u8] {
trim_cr(&buffer[self.qual..self.pos.1])
}
}
pub trait Record {
fn head(&self) -> &[u8];
fn seq(&self) -> &[u8];
fn qual(&self) -> &[u8];
#[inline]
fn id_bytes(&self) -> &[u8] {
self.head().split(|b| *b == b' ').next().unwrap()
}
#[inline]
fn id(&self) -> Result<&str, Utf8Error> {
str::from_utf8(self.id_bytes())
}
#[inline]
fn desc_bytes(&self) -> Option<&[u8]> {
self.head().splitn(2, |b| *b == b' ').nth(1)
}
#[inline]
fn desc(&self) -> Option<Result<&str, Utf8Error>> {
self.desc_bytes().map(str::from_utf8)
}
#[inline]
fn id_desc_bytes(&self) -> (&[u8], Option<&[u8]>) {
let mut h = self.head().splitn(2, |c| *c == b' ');
(h.next().unwrap(), h.next())
}
#[inline]
fn id_desc(&self) -> Result<(&str, Option<&str>), Utf8Error> {
let mut h = str::from_utf8(self.head())?.splitn(2, ' ');
Ok((h.next().unwrap(), h.next()))
}
#[inline]
fn write<W: io::Write>(&self, writer: W) -> io::Result<()> {
write_to(writer, self.head(), self.seq(), self.qual())
}
}
#[derive(Debug, Clone)]
pub struct RefRecord<'a> {
buffer: &'a [u8],
buf_pos: &'a BufferPosition,
}
impl<'a> Record for RefRecord<'a> {
#[inline]
fn head(&self) -> &[u8] {
self.buf_pos.head(self.buffer)
}
#[inline]
fn seq(&self) -> &[u8] {
self.buf_pos.seq(self.buffer)
}
#[inline]
fn qual(&self) -> &[u8] {
self.buf_pos.qual(self.buffer)
}
}
impl<'a> RefRecord<'a> {
#[inline]
pub fn to_owned_record(&self) -> OwnedRecord {
OwnedRecord {
head: self.head().to_vec(),
seq: self.seq().to_vec(),
qual: self.qual().to_vec(),
}
}
#[inline]
pub fn write_unchanged<W: io::Write>(&self, mut writer: W) -> io::Result<()> {
let data = &self.buffer[self.buf_pos.pos.0..self.buf_pos.pos.1];
writer.write_all(data)?;
writer.write_all(b"\n")
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct OwnedRecord {
pub head: Vec<u8>,
pub seq: Vec<u8>,
pub qual: Vec<u8>,
}
impl Record for OwnedRecord {
#[inline]
fn head(&self) -> &[u8] {
&self.head
}
#[inline]
fn seq(&self) -> &[u8] {
&self.seq
}
#[inline]
fn qual(&self) -> &[u8] {
&self.qual
}
}
#[derive(Default, Clone, Debug, Serialize, Deserialize)]
pub struct RecordSet {
buffer: Vec<u8>,
buf_positions: Vec<BufferPosition>,
}
impl<'a> iter::IntoIterator for &'a RecordSet {
type Item = RefRecord<'a>;
type IntoIter = RecordSetIter<'a>;
#[inline]
fn into_iter(self) -> Self::IntoIter {
RecordSetIter {
buffer: &self.buffer,
pos: self.buf_positions.iter(),
}
}
}
pub struct RecordSetIter<'a> {
buffer: &'a [u8],
pos: slice::Iter<'a, BufferPosition>,
}
impl<'a> Iterator for RecordSetIter<'a> {
type Item = RefRecord<'a>;
#[inline]
fn next(&mut self) -> Option<RefRecord<'a>> {
self.pos.next().map(|p| RefRecord {
buffer: self.buffer,
buf_pos: p,
})
}
}
#[inline]
pub fn write_to<W: io::Write>(
mut writer: W,
head: &[u8],
seq: &[u8],
qual: &[u8],
) -> io::Result<()> {
writer.write_all(b"@")?;
writer.write_all(head)?;
writer.write_all(b"\n")?;
writer.write_all(seq)?;
writer.write_all(b"\n+\n")?;
writer.write_all(qual)?;
writer.write_all(b"\n")?;
Ok(())
}
#[inline]
pub fn write_parts<W: io::Write>(
mut writer: W,
id: &[u8],
desc: Option<&[u8]>,
seq: &[u8],
qual: &[u8],
) -> io::Result<()> {
writer.write_all(b"@")?;
writer.write_all(id)?;
if let Some(d) = desc {
writer.write_all(b" ")?;
writer.write_all(d)?;
}
writer.write_all(b"\n")?;
writer.write_all(seq)?;
writer.write_all(b"\n+\n")?;
writer.write_all(qual)?;
writer.write_all(b"\n")?;
Ok(())
}